/**
* @file queue_process.cpp
*
* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
*/
#include "queue_process.h"
#include "log_inner.h"
#include "runtime/mem.h"
#include "runtime/rt_mem_queue.h"
#include "runtime/dev.h"
#include "aicpu/queue_schedule/qs_client.h"

namespace acl {
    constexpr uint32_t RT_MQ_DEPTH_DEFAULT = 8U;
    constexpr uint32_t MEM_SIZE_MAX = 96U;
    constexpr uint16_t MBUF_ENHANCED_QS = 2U;
    constexpr uint16_t MBUF_ENHANCED_ACL = 1U;
    bool QueueProcessor::isInitQs_ = false;
    bool QueueProcessor::isMbufInit_ = false;

    aclError QueueProcessor::acltdtEnqueue(const uint32_t qid, const acltdtBuf buf, const int32_t timeout)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        const QueueDataMutexPtr muPtr = GetMutexForData(qid);
        ACL_CHECK_MALLOC_RESULT(muPtr);
        const uint64_t startTime = GetTimestamp();
        uint64_t endTime = 0U;
        bool continueFlag = false;
        do {
            const std::lock_guard<std::mutex> lk(muPtr->muForDequeue);
            constexpr int32_t deviceId = 0;
            const rtError_t rtRet = rtMemQueueEnQueue(deviceId, qid, buf);
            if (rtRet == RT_ERROR_NONE) {
                return ACL_SUCCESS;
            }
            if (rtRet != ACL_ERROR_RT_QUEUE_FULL) {
                ACL_LOG_CALL_ERROR("[Enqueue][Queue]fail to enqueue result = %d", rtRet);
                return rtRet;
            }
            (void)mmSleep(1U); // sleep 1ms
            endTime = GetTimestamp();
            continueFlag =
                ((endTime - startTime) <= (static_cast<uint64_t>(timeout) * static_cast<uint64_t>(MSEC_TO_USEC)));
        } while (continueFlag || (timeout < 0));
        return ACL_ERROR_FAILURE;
    }

    aclError QueueProcessor::acltdtDequeue(const uint32_t qid, acltdtBuf *const buf, const int32_t timeout)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        const QueueDataMutexPtr muPtr = GetMutexForData(qid);
        ACL_CHECK_MALLOC_RESULT(muPtr);
        const uint64_t startTime = GetTimestamp();
        uint64_t endTime = 0U;
        bool continueFlag = false;
        do {
            const std::lock_guard<std::mutex> lk(muPtr->muForEnqueue);
            constexpr int32_t deviceId = 0;
            const rtError_t rtRet = rtMemQueueDeQueue(deviceId, qid, buf);
            if (rtRet == RT_ERROR_NONE) {
                return ACL_SUCCESS;
            }
            if (rtRet != ACL_ERROR_RT_QUEUE_EMPTY) {
                ACL_LOG_CALL_ERROR("[Dequeue][Queue]fail to dequeue result = %d", rtRet);
                return rtRet;
            }
            (void)mmSleep(1U); // sleep 1ms
            endTime = GetTimestamp();
            continueFlag =
                ((endTime - startTime) <= (static_cast<uint64_t>(timeout) * static_cast<uint64_t>(MSEC_TO_USEC)));
        } while (continueFlag || (timeout < 0));
        return ACL_ERROR_FAILURE;
    }

    aclError QueueProcessor::acltdtGrantQueue(const uint32_t qid, const int32_t pid, const uint32_t permission,
        const int32_t timeout)
    {
        (void)(qid);
        (void)(pid);
        (void)(permission);
        (void)(timeout);
        ACL_LOG_ERROR("[Unsupport][Feature]acltdtGrantQueue is not supported in this version. Please check.");
        const char_t *argList[] = {"feature", "reason"};
        const char_t *argVal[] = {"acltdtGrantQueue", "please check"};
        acl::AclErrorLogManager::ReportInputErrorWithChar(acl::UNSUPPORTED_FEATURE_MSG, argList, argVal, 2UL);
        return ACL_ERROR_FEATURE_UNSUPPORTED;
    }

    aclError QueueProcessor::acltdtAttachQueue(const uint32_t qid, const int32_t timeout,
        uint32_t *const permission)
    {
        (void)(qid);
        (void)(permission);
        (void)(timeout);
        ACL_LOG_ERROR("[Unsupport][Feature]acltdtAttachQueue is not supported in this version. Please check.");
        const char_t *argList[] = {"feature", "reason"};
        const char_t *argVal[] = {"acltdtAttachQueue", "please check"};
        acl::AclErrorLogManager::ReportInputErrorWithChar(acl::UNSUPPORTED_FEATURE_MSG, argList, argVal, 2UL);
        return ACL_ERROR_FEATURE_UNSUPPORTED;
    }

    aclError QueueProcessor::acltdtDestroyQueueOndevice(const uint32_t qid, const bool isThreadMode)
    {
        ACL_LOG_INFO("Start to destroy queue %u", qid);
        constexpr int32_t deviceId = 0;
        // get qs id
        int32_t dstPid = 0;
        size_t routeNum = 0UL;
        const std::lock_guard<std::recursive_mutex> lk(muForQueueCtrl_);
        if (GetDstInfo(deviceId, QS_PID, dstPid, isThreadMode) == ACL_SUCCESS) {
            ACL_LOG_INFO("find qs pid %d", dstPid);
            rtEschedEventSummary_t eventSum = {0, 0U, 0, 0U, 0U, nullptr, 0U, 0};
            rtEschedEventReply_t ack = {nullptr, 0U, 0U};
            bqs::QsProcMsgRsp qsRsp = {0UL, 0, 0U, 0U, 0U, {0}};
            eventSum.pid = dstPid;
            eventSum.grpId = bqs::BIND_QUEUE_GROUP_ID;
            eventSum.eventId = RT_MQ_SCHED_EVENT_QS_MSG; // qs EVENT_ID
            eventSum.dstEngine = static_cast<uint32_t>(RT_MQ_DST_ENGINE_CCPU_DEVICE);
            ack.buf = reinterpret_cast<char_t *>(&qsRsp);
            ack.bufLen = sizeof(qsRsp);
            const acltdtQueueRouteQueryInfo queryInfo = {bqs::BQS_QUERY_TYPE_SRC_OR_DST, qid, qid, true, true, true};
            ACL_REQUIRES_OK(GetQueueRouteNum(&queryInfo, deviceId, eventSum, ack, routeNum));
        }
        if (routeNum > 0U) {
            ACL_LOG_ERROR("qid [%u] can not be destroyed, it need to be unbinded first.", qid);
            return ACL_ERROR_FAILURE;
        }
        ACL_REQUIRES_CALL_RTS_OK(rtMemQueueDestroy(deviceId, qid), rtMemQueueDestroy);
        DeleteMutexForData(qid);
        ACL_LOG_INFO("successfully to execute destroy queue %u", qid);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::GetQueuePermission(const int32_t deviceId, uint32_t qid,
        rtMemQueueShareAttr_t &permission) const
    {
        uint32_t outLen = sizeof(permission);
        if (rtMemQueueQuery(deviceId, RT_MQ_QUERY_QUE_ATTR_OF_CUR_PROC,
                            &qid, sizeof(qid), &permission, &outLen) != RT_ERROR_NONE) {
            ACL_LOG_INNER_ERROR("get queue permission failed");
            return ACL_ERROR_FAILURE;
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::InitQueueSchedule(const int32_t devId) const
    {
        if (!isInitQs_) {
            ACL_LOG_INFO("need to init queue schedule");
            ACL_REQUIRES_CALL_RTS_OK(rtMemQueueInitQS(devId, nullptr), rtMemQueueInitQS);
            isInitQs_ = true;
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::GetDstInfo(const int32_t deviceId, const PID_QUERY_TYPE type,
        int32_t &dstPid, const bool isThreadMode) const
    {
        if (isThreadMode && isInitQs_) {
            dstPid = mmGetPid();
            return ACL_SUCCESS;
        }
        rtBindHostpidInfo_t info = {0, 0U, 0U, 0};
        info.hostPid = mmGetPid();
        if (type == CP_PID) {
            info.cpType = RT_DEV_PROCESS_CP1;
        } else {
            info.cpType = RT_DEV_PROCESS_QS;
        }
        info.chipId = static_cast<uint32_t>(deviceId);
        ACL_LOG_INFO("start to get dst pid, deviceId is %d, type is %d", deviceId, type);
        const auto ret = rtQueryDevPid(&info, &dstPid);
        if (ret != ACL_RT_SUCCESS) {
            ACL_LOG_INFO("can not query device pid");
            return ret;
        }
        ACL_LOG_INFO("get dst pid %d success, type is %d", dstPid, type);
        return ACL_SUCCESS;
    }

    static aclError AllocMBufOnDevice(void **const devPtr, void **const mBuf, const size_t size)
    {
        ACL_REQUIRES_CALL_RTS_OK(rtMbufAlloc(mBuf, size), rtMbufAlloc);
        ACL_CHECK_MALLOC_RESULT(*mBuf);
        if (rtMbufGetBuffAddr(*mBuf, devPtr) != RT_ERROR_NONE) {
            (void)rtMbufFree(*mBuf);
            ACL_LOG_INNER_ERROR("[Get][mbuf]get mbuf failed.");
            return ACL_ERROR_BAD_ALLOC;
        }
        if (*devPtr == nullptr) {
            (void)rtMbufFree(*mBuf);
            ACL_LOG_INNER_ERROR("[Get][mbuf]get dataPtr failed.");
            return ACL_ERROR_BAD_ALLOC;
        }
        (void)memset_s(*devPtr, size, 0, size);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::SendConnectQsMsg(const int32_t deviceId,
                                              rtEschedEventSummary_t &eventSum,
                                              rtEschedEventReply_t &ack)
    {
        // send contact msg
        ACL_LOG_INFO("start to send contact msg");
        bqs::QsBindInit qsInitMsg = {0U, 0, 0U, MBUF_ENHANCED_ACL, {0}};
        qsInitMsg.pid = mmGetPid();
        qsInitMsg.grpId = 0U;
        eventSum.subeventId = bqs::ACL_BIND_QUEUE_INIT;
        eventSum.msgLen = sizeof(qsInitMsg);
        eventSum.msg = reinterpret_cast<char_t *>(&qsInitMsg);
        ACL_REQUIRES_CALL_RTS_OK(rtEschedSubmitEventSync(deviceId, &eventSum, &ack), rtEschedSubmitEventSync);
        bqs::QsProcMsgRsp *const rsp = reinterpret_cast<bqs::QsProcMsgRsp *>(ack.buf);
        if (rsp->retCode != 0) {
            ACL_LOG_INNER_ERROR("send connet qs failed,  ret code id %d", rsp->retCode);
            return ACL_ERROR_FAILURE;
        }
        qsContactId_ = rsp->retValue;
        if (rsp->majorVersion >= MBUF_ENHANCED_QS) {
            isMbufEnhanced_ = true;
        }
        eventSum.msgLen = 0U;
        eventSum.msg = nullptr;
        ACL_LOG_INFO("successfully execute to SendConnectQsMsg");
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::SendBindUnbindMsgOnDevice(acltdtQueueRouteList *const qRouteList,
                                                       const bool isBind,
                                                       rtEschedEventSummary_t &eventSum,
                                                       rtEschedEventReply_t &ack) const
    {
        ACL_LOG_INFO("start to send bind or unbind msg");
        // send bind or unbind msg
        const size_t routeSize = sizeof(bqs::QsRouteHead) + (qRouteList->routeList.size() * sizeof(bqs::QueueRoute));
        ACL_LOG_INFO("route size is %zu, queue route num is %zu", routeSize, qRouteList->routeList.size());
        void *devPtr = nullptr;
        void *mBuf = nullptr;
        ACL_REQUIRES_OK(AllocMBufOnDevice(&devPtr, &mBuf, routeSize));
        bqs::QsRouteHead *const head = reinterpret_cast<bqs::QsRouteHead *>(devPtr);
        head->length = routeSize;
        head->routeNum = qRouteList->routeList.size();
        head->subEventId =
            isBind ? static_cast<uint32_t>(bqs::ACL_BIND_QUEUE) : static_cast<uint32_t>(bqs::ACL_UNBIND_QUEUE);
        size_t offset = sizeof(bqs::QsRouteHead);
        for (size_t i = 0UL; i < qRouteList->routeList.size(); ++i) {
            bqs::QueueRoute *const tmp =
                reinterpret_cast<bqs::QueueRoute *>(static_cast<uint8_t *>(devPtr) + offset);
            tmp->srcId = qRouteList->routeList[i].srcId;
            tmp->dstId = qRouteList->routeList[i].dstId;
            offset += sizeof(bqs::QueueRoute);
        }
        // device need to use mbuff
        auto ret = rtMemQueueEnQueue(0, qsContactId_, mBuf);
        if (ret != RT_ERROR_NONE) {
            (void)rtMbufFree(mBuf);
            mBuf = nullptr;
            devPtr = nullptr;
            ACL_LOG_INNER_ERROR("[Call][Rts]call rtMemQueueEnQueue failed");
            return ret;
        }

        bqs::QueueRouteList bqsBindUnbindMsg = {0U, 0U, {0}};
        eventSum.subeventId =
            isBind ? static_cast<uint32_t>(bqs::ACL_BIND_QUEUE) : static_cast<uint32_t>(bqs::ACL_UNBIND_QUEUE);
        eventSum.msgLen = sizeof(bqsBindUnbindMsg);
        eventSum.msg = reinterpret_cast<char_t *>(&bqsBindUnbindMsg);
        ret = rtEschedSubmitEventSync(0, &eventSum, &ack);
        eventSum.msgLen = 0U;
        eventSum.msg = nullptr;
        if (ret != RT_ERROR_NONE) {
            ACL_LOG_INNER_ERROR("call rtEschedSubmitEventSync failed,  ret code id %d", ret);
            if (!isMbufEnhanced_) {
                (void)rtMbufFree(mBuf);
                mBuf = nullptr;
                devPtr = nullptr;
            }
            return ret;
        }
        if (isMbufEnhanced_) {
            // after event sync mbuf need to be dequeue to be used as mbuf can not be free by enqueue side
            ACL_REQUIRES_CALL_RTS_OK(rtMemQueueDeQueue(0, qsContactId_, &mBuf), rtMemQueueDeQueue);
            (void)rtMbufGetBuffAddr(mBuf, &devPtr);
        }
        bqs::QsProcMsgRsp *const rsp = reinterpret_cast<bqs::QsProcMsgRsp *>(ack.buf);
        if (rsp->retCode != 0) {
            ACL_LOG_INNER_ERROR("send connet qs failed,  ret code id %d", rsp->retCode);
            (void)rtMbufFree(mBuf);
            mBuf = nullptr;
            devPtr = nullptr;
            return ACL_ERROR_FAILURE;
        }
        offset = sizeof(bqs::QsRouteHead);
        for (size_t i = 0UL; i < qRouteList->routeList.size(); ++i) {
            bqs::QueueRoute *const tmp =
                reinterpret_cast<bqs::QueueRoute *>(static_cast<uint8_t *>(devPtr) + offset);
            qRouteList->routeList[i].status = tmp->status;
            ACL_LOG_INFO("route %zu, srcqid is %u, dst pid is %u, status is %d", i, qRouteList->routeList[i].srcId,
                         qRouteList->routeList[i].dstId, qRouteList->routeList[i].status);
            offset += sizeof(bqs::QueueRoute);
        }
        (void)rtMbufFree(mBuf);
        devPtr = nullptr;
        mBuf = nullptr;
        return ret;
    }

    aclError QueueProcessor::GetQueueRouteNum(const acltdtQueueRouteQueryInfo *const queryInfo,
                                              const int32_t deviceId,
                                              rtEschedEventSummary_t &eventSum,
                                              rtEschedEventReply_t &ack,
                                              size_t &routeNum) const
    {
        ACL_LOG_INFO("start to get queue route num");
        bqs::QueueRouteQuery routeQuery = {0UL, 0U, 0U, 0U, 0, 0, 0UL, {0}};
        routeQuery.queryType = static_cast<uint32_t>(queryInfo->mode);
        routeQuery.srcId = queryInfo->srcId;
        routeQuery.dstId = queryInfo->dstId;

        eventSum.subeventId = bqs::ACL_QUERY_QUEUE_NUM;
        eventSum.msgLen = sizeof(routeQuery);
        eventSum.msg = reinterpret_cast<char_t *>(&routeQuery);
        ACL_REQUIRES_CALL_RTS_OK(rtEschedSubmitEventSync(deviceId, &eventSum, &ack), rtEschedSubmitEventSync);
        bqs::QsProcMsgRsp *const rsp = reinterpret_cast<bqs::QsProcMsgRsp *>(ack.buf);
        if (rsp->retCode != 0) {
            ACL_LOG_INNER_ERROR("get queue route num failed,  ret code id %d", rsp->retCode);
            return ACL_ERROR_FAILURE;
        }
        routeNum = rsp->retValue;
        eventSum.msgLen = 0U;
        eventSum.msg = nullptr;
        ACL_LOG_INFO("sucessfully to get queue route num %zu.", routeNum);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::QueryQueueRoutesOnDevice(const acltdtQueueRouteQueryInfo *const queryInfo,
        const size_t routeNum, rtEschedEventSummary_t &eventSum, rtEschedEventReply_t &ack,
        acltdtQueueRouteList *const qRouteList) const
    {
        ACL_LOG_INFO("start to query queue route %zu", routeNum);
        if (routeNum == 0U) {
            return ACL_SUCCESS;
        }
        const size_t routeSize = sizeof(bqs::QsRouteHead) + sizeof(bqs::QueueRouteQuery) +
            (routeNum * sizeof(bqs::QueueRoute));
        ACL_LOG_INFO("route size is %zu, queue route num is %zu", routeSize, qRouteList->routeList.size());
        void *devPtr = nullptr;
        void *mBuf = nullptr;
        ACL_REQUIRES_OK(AllocMBufOnDevice(&devPtr, &mBuf, routeSize));
        bqs::QsRouteHead *const head = reinterpret_cast<bqs::QsRouteHead *>(devPtr);
        head->length = routeSize;
        head->routeNum = routeNum;
        head->subEventId = bqs::ACL_QUERY_QUEUE;
        bqs::QueueRouteQuery *const routeQuery =
            reinterpret_cast<bqs::QueueRouteQuery *>(static_cast<uint8_t *>(devPtr) + sizeof(bqs::QsRouteHead));
        routeQuery->queryType = static_cast<uint32_t>(queryInfo->mode);
        routeQuery->srcId = queryInfo->srcId;
        routeQuery->dstId = queryInfo->dstId;
        // device need to use mbuff
        auto ret = rtMemQueueEnQueue(0, qsContactId_, mBuf);
        if (ret != RT_ERROR_NONE) {
            (void)rtMbufFree(mBuf);
            devPtr = nullptr;
            mBuf = nullptr;
            ACL_LOG_INNER_ERROR("[Call][Rts]call rtMemQueueEnQueue failed");
            return ret;
        }
        bqs::QueueRouteList qsCommonMsg = {0U, 0U, {0}};
        eventSum.subeventId = bqs::ACL_QUERY_QUEUE;
        eventSum.msgLen = sizeof(qsCommonMsg);
        eventSum.msg = reinterpret_cast<char_t *>(&qsCommonMsg);

        ret = rtEschedSubmitEventSync(0, &eventSum, &ack);
        eventSum.msgLen = 0U;
        eventSum.msg = nullptr;
        if (ret != RT_ERROR_NONE) {
            if (!isMbufEnhanced_) {
                (void)rtMbufFree(mBuf);
                mBuf = nullptr;
                devPtr = nullptr;
            }
            return ret;
        }
        if (isMbufEnhanced_) {
            // after event sync mbuf need to be dequeue to be used as mbuf can not be free by enqueue side
            ACL_REQUIRES_CALL_RTS_OK(rtMemQueueDeQueue(0, qsContactId_, &mBuf), rtMemQueueDeQueue);
            (void)rtMbufGetBuffAddr(mBuf, &devPtr);
        }

        bqs::QsProcMsgRsp *const rsp = reinterpret_cast<bqs::QsProcMsgRsp *>(ack.buf);
        if (rsp->retCode != 0) {
            ACL_LOG_INNER_ERROR("query queue route failed,  ret code id %d", rsp->retCode);
            (void)rtMbufFree(mBuf);
            devPtr = nullptr;
            mBuf = nullptr;
            return ACL_ERROR_FAILURE;
        }
        size_t offset = sizeof(bqs::QsRouteHead) + sizeof(bqs::QueueRouteQuery);
        for (size_t i = 0UL; i < routeNum; ++i) {
            bqs::QueueRoute *const tmp =
                reinterpret_cast<bqs::QueueRoute *>(static_cast<uint8_t *>(devPtr) + offset);
            const acltdtQueueRoute tmpQueueRoute = {tmp->srcId, tmp->dstId, tmp->status};
            qRouteList->routeList.push_back(tmpQueueRoute);
            ACL_LOG_INFO("route %zu, srcqid is %u, dst pid is %u, status is %d", i, qRouteList->routeList[i].srcId,
                         qRouteList->routeList[i].dstId, qRouteList->routeList[i].status);
            offset += sizeof(bqs::QueueRoute);
        }
        (void)rtMbufFree(mBuf);
        devPtr = nullptr;
        mBuf = nullptr;
        ACL_LOG_INFO("Successfully to execute acltdtQueryQueueRoutes, queue route is %zu",
                     qRouteList->routeList.size());
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::QueryAllocGroup()
    {
        ACL_LOG_INFO("Start to QueryAllocGroup.");
        static bool isGroupQuery = false;
        if (isGroupQuery) {
            return ACL_SUCCESS;
        }
        const std::lock_guard<std::mutex> lk(muForQueryGroup_);
        if (isGroupQuery) {
            return ACL_SUCCESS;
        }
        size_t grpNum;
        uint32_t alloc;
        std::string grpName;
        const auto pid = mmGetPid();
        rtMemGrpQueryInput_t input = {};
        input.cmd = RT_MEM_GRP_QUERY_GROUPS_OF_PROCESS;
        input.grpQueryByProc.pid = pid;
        rtMemGrpQueryOutput_t output = {};
        rtMemGrpOfProc_t outputInfo[QUERY_BUFF_GRP_MAX_NUM] = {{}};
        output.groupsOfProc = outputInfo;
        output.maxNum = QUERY_BUFF_GRP_MAX_NUM;

        ACL_REQUIRES_CALL_RTS_OK(rtMemGrpQuery(&input, &output), rtMemGrpQuery);
        grpNum = output.resultNum;
        if ((grpNum == 0U) || (output.groupsOfProc == nullptr)) {
            ACL_LOG_ERROR("[Check] grpNum is zero or groupsOfProc is nullptr, grpNum is %zu", grpNum);
            return ACL_ERROR_FAILURE;
        }
        for (size_t num = 0U; num < grpNum; num++) {
            alloc = output.groupsOfProc[num].attr.alloc;
            grpName = std::string(output.groupsOfProc[num].groupName);
            ACL_LOG_INFO("This proc [%d] has [%zu] group, alloc is %u, name is %s",
                         pid, grpNum, alloc, grpName.c_str());
            if (alloc == 1U) {
                ACL_REQUIRES_OK(QueryGroupId(grpName));
                isGroupQuery = true;
                return ACL_SUCCESS;
            }
        }
        ACL_LOG_ERROR("[Check] has no alloc");
        return ACL_ERROR_FAILURE;
    }

    aclError QueueProcessor::QueryGroupId(const std::string &grpName)
    {
        ACL_LOG_INFO("Query groupId from name = %s", grpName.c_str());
        rtMemGrpQueryInput_t input = {};
        input.cmd = RT_MEM_GRP_QUERY_GROUP_ID;
        const auto strcpyRet = strcpy_s(input.grpQueryGroupId.grpName,
                                        sizeof(input.grpQueryGroupId.grpName), grpName.c_str());
        if (strcpyRet != EOK) {
            ACL_LOG_INNER_ERROR("[strcpy]copy group name to input failed, result = %d.", strcpyRet);
            return ACL_ERROR_FAILURE;
        }
        rtMemGrpQueryOutput_t output = {};
        rtMemGrpQueryGroupIdInfo_t outputInfo = {};
        output.groupIdInfo = &outputInfo;

        ACL_REQUIRES_CALL_RTS_OK(rtMemGrpQuery(&input, &output), rtMemGrpQuery);
        qsGroupId_ = output.groupIdInfo->groupId;
        ACL_LOG_INFO("This groupId is %d, name is %s", qsGroupId_, grpName.c_str());
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtAllocBufData(const size_t size, const uint32_t type, acltdtBuf *const buf)
    {
        rtError_t ret;
        if (!isMbufInit_) {
            rtMemBuffCfg_t cfg = {{}};
            ret = rtMbufInit(&cfg);
            if ((ret != ACL_RT_SUCCESS) && (ret != ACL_ERROR_RT_REPEATED_INIT)) {
                ACL_LOG_INNER_ERROR("mbuf init failed, ret is %d", ret);
                return ret;
            }
            isMbufInit_ = true;
        }
        ret = rtMbufAllocEx(buf, size, type, qsGroupId_);
        if (ret != RT_ERROR_NONE) {
            ACL_LOG_CALL_ERROR("[Alloc][mbuf]fail to alloc mbuf result = %d", ret);
            return ret;
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtFreeBuf(acltdtBuf buf)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        const rtError_t rtRet = rtMbufFree(buf);
        if (rtRet != RT_ERROR_NONE) {
            ACL_LOG_CALL_ERROR("[Free][mbuf]fail to alloc mbuf result = %d", rtRet);
            return rtRet;
        }
        buf = nullptr;
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtSetBufDataLen(const acltdtBuf buf, const size_t len)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufSetDataLen(buf, len), rtMbufSetDataLen);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtGetBufDataLen(const acltdtBuf buf, size_t *const len)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(len);
        uint64_t dataLen = 0U;
        ACL_REQUIRES_CALL_RTS_OK(rtMbufGetDataLen(buf, &dataLen), rtMbufGetDataLen);
        *len = static_cast<size_t>(dataLen);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtGetBufData(const acltdtBuf buf, void **const dataPtr, size_t *const size)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(dataPtr);
        ACL_REQUIRES_NOT_NULL(size);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufGetBuffAddr(buf, dataPtr), rtMbufGetBuffAddr);
        uint64_t bufSize = 0U;
        ACL_REQUIRES_CALL_RTS_OK(rtMbufGetBuffSize(buf, &bufSize), rtMbufGetBuffSize);
        *size = static_cast<size_t>(bufSize);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtGetBufUserData(const acltdtBuf buf, void *dataPtr,
                                                  const size_t size, const size_t offset)
    {
        // The current default private data area size is 96B, if offset+size exceeds 96, an error is reported
        ACL_CHECK_LESS_UINT((size + offset), MEM_SIZE_MAX);
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(dataPtr);
        uint64_t bufSize = 0U;
        void *tmpDataPtr = nullptr;
        ACL_REQUIRES_CALL_RTS_OK(rtMbufGetPrivInfo(buf, &tmpDataPtr, &bufSize), rtMbufGetPrivInfo);
        ACL_CHECK_LESS_UINT(size, static_cast<size_t>(bufSize));
        ACL_REQUIRES_NOT_NULL(tmpDataPtr);
        const auto ret = memcpy_s(dataPtr, size, (static_cast<uint8_t *>(tmpDataPtr) + offset), size);
        if (ret != EOK) {
            ACL_LOG_INNER_ERROR("call memcpy_s failed, result = %d, size = %zu, bufSize = %lu, offset = %zu",
                                ret, size, bufSize, offset);
            return ACL_ERROR_FAILURE;
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtSetBufUserData(acltdtBuf buf, const void *dataPtr,
                                                  const size_t size, const size_t offset)
    {
        // The current default private data area size is 96B, if offset+size exceeds 96, an error is reported
        ACL_CHECK_LESS_UINT((size + offset), MEM_SIZE_MAX);
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(dataPtr);
        uint64_t bufSize = 0U;
        void *tmpDataPtr = nullptr;
        ACL_REQUIRES_CALL_RTS_OK(rtMbufGetPrivInfo(buf, &tmpDataPtr, &bufSize), rtMbufGetPrivInfo);
        ACL_CHECK_LESS_UINT(size, static_cast<size_t>(bufSize));
        ACL_REQUIRES_NOT_NULL(tmpDataPtr);
        const auto ret = memcpy_s((static_cast<uint8_t *>(tmpDataPtr) + offset),
                                  (static_cast<size_t>(bufSize) - offset),
                                  dataPtr, size);
        if (ret != EOK) {
            ACL_LOG_INNER_ERROR("call memcpy_s failed, result = %d, size = %zu, bufSize = %lu, offset = %zu",
                                ret, size, bufSize, offset);
            return ACL_ERROR_FAILURE;
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtCopyBufRef(const acltdtBuf buf, acltdtBuf *const newBuf)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(newBuf);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufCopyBufRef(buf, newBuf), rtMbufCopyBufRef);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtAppendBufChain(const acltdtBuf headBuf, const acltdtBuf buf)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(headBuf);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufChainAppend(headBuf, buf), rtMbufChainAppend);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtGetBufChainNum(const acltdtBuf headBuf, uint32_t *const num)
    {
        ACL_REQUIRES_NOT_NULL(headBuf);
        ACL_REQUIRES_NOT_NULL(num);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufChainGetMbufNum(headBuf, num), rtMbufChainAppend);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtGetBufFromChain(const acltdtBuf headBuf, const uint32_t index, acltdtBuf *const buf)
    {
        ACL_REQUIRES_NOT_NULL(buf);
        ACL_REQUIRES_NOT_NULL(headBuf);
        ACL_REQUIRES_CALL_RTS_OK(rtMbufChainGetMbuf(headBuf, index, buf), rtMbufChainGetMbuf);
        return ACL_SUCCESS;
    }

    QueueDataMutexPtr QueueProcessor::GetMutexForData(const uint32_t qid)
    {
        const std::lock_guard<std::mutex> lk(muForQueueMap_);
        const auto it = muForQueue_.find(qid);
        if (it != muForQueue_.end()) {
            return it->second;
        } else {
            const QueueDataMutexPtr queueDataMutex = std::make_shared<QueueDataMutex>();
            muForQueue_[qid] = queueDataMutex;
            return queueDataMutex;
        }
    }

    void QueueProcessor::DeleteMutexForData(const uint32_t qid)
    {
        const std::lock_guard<std::mutex> lk(muForQueueMap_);
        const auto it = muForQueue_.find(qid);
        if (it != muForQueue_.end()) {
            (void)muForQueue_.erase(it);
        }
        return;
    }

    uint64_t QueueProcessor::GetTimestamp() const
    {
        mmTimeval tv{};
        const auto ret = mmGetTimeOfDay(&tv, nullptr);
        if (ret != EN_OK) {
            ACL_LOG_WARN("Func mmGetTimeOfDay did not return success, errorCode = %d", ret);
        }
        // 1000000: seconds to microseconds
        const uint64_t totalUseTime = static_cast<size_t>(tv.tv_usec) +
            (static_cast<uint64_t>(tv.tv_sec) * 1000000UL);
        return totalUseTime;
    }

    aclError QueueProcessor::GetDeviceId(int32_t& deviceId) const
    {
        deviceId = 0;
        aclrtRunMode aclRunMode = ACL_HOST;
        const aclError getRunModeRet = aclrtGetRunMode(&aclRunMode);
        if (getRunModeRet != ACL_SUCCESS) {
            ACL_LOG_CALL_ERROR("[Get][RunMode]get run mode failed, errorCode = %d.", getRunModeRet);
            return getRunModeRet;
        }

        if (aclRunMode == ACL_HOST) {
            const rtError_t rtRet = rtGetDevice(&deviceId);
            if (rtRet != ACL_SUCCESS) {
                ACL_LOG_CALL_ERROR("[Get][DeviceId]fail to get deviceId errorCode = %d", rtRet);
                return rtRet;
            }
        }
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtEnqueueData(const uint32_t qid, const void *const data, const size_t dataSize,
        const void *const userData, const size_t userDataSize, const int32_t timeout, const uint32_t rsv)
    {
        ACL_LOG_INFO("Start to enqueue data qid is %u, dataSize is %zu, userDataSize is %zu, "
                     "timeout is %d, rsv is %u", qid, dataSize, userDataSize, timeout, rsv);
        ACL_REQUIRES_NOT_NULL(data);
        ACL_REQUIRES_POSITIVE(dataSize);

        int32_t deviceId;
        ACL_REQUIRES_OK(GetDeviceId(deviceId));
        const QueueDataMutexPtr muPtr = GetMutexForData(qid);
        ACL_CHECK_MALLOC_RESULT(muPtr);
        const std::lock_guard<std::mutex> lk(muPtr->muForDequeue);

        rtMemQueueBuff_t queueBuf = {nullptr, 0U, nullptr, 0U};
        rtMemQueueBuffInfo queueBufInfo = {const_cast<void*>(data), dataSize};
        queueBuf.buffCount = 1U;
        queueBuf.buffInfo = &queueBufInfo;
        queueBuf.contextAddr = const_cast<void*>(userData);
        queueBuf.contextLen = userDataSize;

        const rtError_t ret = rtMemQueueEnQueueBuff(deviceId, qid, &queueBuf, timeout);
        if (ret == ACL_ERROR_RT_QUEUE_FULL) {
            ACL_LOG_INFO("queue is full, device is %d, qid is %u", deviceId, qid);
            return ret;
        }
        if (ret != RT_ERROR_NONE) {
            ACL_LOG_INNER_ERROR("Fail to execute rtMemQueueEnQueueBuff, device is %d, qid is %u", deviceId, qid);
            return ret;
        }

        ACL_LOG_INFO("success to execute acltdtEnqueueData, device is %d, qid is %u", deviceId, qid);
        return ACL_SUCCESS;
    }

    aclError QueueProcessor::acltdtDequeueData(const uint32_t qid, void *const data, const size_t dataSize,
        size_t *const retDataSize, void *const userData, const size_t userDataSize, const int32_t timeout)
    {
        ACL_LOG_INFO("Start to dequeue data qid is %u, dataSize is %zu, userDataSize is %zu, "
                     "timeout is %d,", qid, dataSize, userDataSize, timeout);
        ACL_REQUIRES_NOT_NULL(data);
        ACL_REQUIRES_NOT_NULL(retDataSize);
        ACL_REQUIRES_POSITIVE(dataSize);

        int32_t deviceId;
        ACL_REQUIRES_OK(GetDeviceId(deviceId));

        const QueueDataMutexPtr muPtr = GetMutexForData(qid);
        ACL_CHECK_MALLOC_RESULT(muPtr);
        const std::lock_guard<std::mutex> lk(muPtr->muForEnqueue);

        rtError_t ret = rtMemQueuePeek(deviceId, qid, retDataSize, timeout);
        if (ret == ACL_ERROR_RT_QUEUE_EMPTY) {
            ACL_LOG_INFO("queue is empty, device is %d, qid is %u", deviceId, qid);
            return ret;
        }
        if (ret != RT_ERROR_NONE) {
            ACL_LOG_ERROR("peek queue [%u] failed, device is %d", qid, deviceId);
            return ret;
        }

        rtMemQueueBuff_t queueBuf = {nullptr, 0U, nullptr, 0U};
        rtMemQueueBuffInfo queueBufInfo = {data, dataSize};
        queueBuf.buffCount = 1U;
        queueBuf.buffInfo = &queueBufInfo;
        queueBuf.contextAddr = userData;
        queueBuf.contextLen = userDataSize;

        ret = rtMemQueueDeQueueBuff(deviceId, qid, &queueBuf, timeout);
        if (ret == ACL_ERROR_RT_QUEUE_EMPTY) {
            ACL_LOG_INFO("queue is empty, device is %d, qid is %u", deviceId, qid);
            return ret;
        }

        if (ret != RT_ERROR_NONE) {
            ACL_LOG_ERROR("failed to rtMemQueueDeQueueBuf, device is %d, qid is %u", deviceId, qid);
            return ret;
        }

        ACL_LOG_INFO("success to execute acltdtDequeueData, device is %d, qid is %u, retDataSize is %zu",
            deviceId, qid, *retDataSize);
        return ACL_SUCCESS;
    }

    void QueueProcessor::acltdtSetDefaultQueueAttr(acltdtQueueAttr &attr)
    {
        (void)memset_s(attr.name, static_cast<size_t>(RT_MQ_MAX_NAME_LEN), 0, sizeof(attr.name));
        attr.depth = RT_MQ_DEPTH_DEFAULT;
        attr.workMode = static_cast<uint32_t>(RT_MQ_MODE_DEFAULT);
        attr.flowCtrlFlag = false;
        attr.flowCtrlDropTime = 0U;
        attr.overWriteFlag = false;
        return;
    }

    aclError QueueProcessor::acltdtCreateQueueWithAttr(const int32_t deviceId, const acltdtQueueAttr *const attr,
        uint32_t *const qid) const
    {
        if (attr == nullptr) {
            acltdtQueueAttr tmpAttr;
            acltdtSetDefaultQueueAttr(tmpAttr);
            ACL_REQUIRES_CALL_RTS_OK(rtMemQueueCreate(deviceId, &tmpAttr, qid), rtMemQueueCreate);
        } else {
            ACL_REQUIRES_CALL_RTS_OK(rtMemQueueCreate(deviceId, attr, qid), rtMemQueueCreate);
        }
        return ACL_SUCCESS;
    }
}
